feat: expose spark-compatible functions#1564
Conversation
Add `datafusion.functions.spark` module exposing the upstream `datafusion-spark` crate's UDF/UDAF library (~87 functions across string, math, datetime, hash, array, aggregate, bitwise, bitmap, conditional, collection, conversion, json, map, url categories). For DataFrame use, import the typed Python wrappers from `datafusion.functions.spark`. For SQL use, call `SessionContext.enable_spark_functions()` to register the Spark UDFs by name (overriding DataFusion built-ins of the same name with their Spark semantics — NULL-propagating `concat`, 1-indexed `substring`, HALF_UP `round`, etc.). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Seven `#[allow(clippy::borrow_deref_ref)]` attributes on module declarations in `crates/core/src/lib.rs` had become stale — the only remaining lint hit was a redundant `&*x.as_str()` pattern in `parse_file_compression_type`. Rewriting that call to `&x.unwrap_or_default()` lets every allow come off, removing noise that new modules were copying without need. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch most spark wrappers from UDF-direct path (which forced `spark_udf_fixed!(name, fn_category::name, args...)` repetition) to a `spark_expr_fn!` macro that mirrors the existing `expr_fn!` macro in `functions.rs`, so calls collapse to `spark_expr_fn!(sha2, arg1 bit_length);`. UDF-direct retained for genuinely variadic functions whose upstream `expr_fn` wrappers were generated with a single-`Expr` arm by `export_functions!` (concat, array, xxhash64, parse_url family, etc.) so that the Python side keeps its `*args` ergonomics. Aggregates collapse the same way via `spark_aggregate!` mirroring `aggregate_function!`. Net 173 lines removed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The intro wording implied "SQL functions" only; the same wrappers are the primary entry point for the DataFrame API as well. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace API-speak ("Import the submodule", "Returned values are Expr
instances that compose") with a concrete description of where users can
actually drop these calls.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hand-maintained category list would drift from the actual module as upstream `datafusion-spark` adds/removes functions. Replace with a pointer to the AutoAPI-generated reference, which renders from the module itself. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
38 wrappers carried `# doctest: +SKIP` because outputs weren't verified at authoring time. Run each with concrete inputs, capture actual outputs, and inline the values so the doctests execute and stay correct. Covers datetime (20), URL (5), bitmap (3), map (3), and remaining hash, JSON, math, string, conversion, and format_string cases. Net new doctest coverage: 65 examples now run that were skipped before; total skipped across the suite drops from 53 to 12. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align positional parameter names in `functions.spark` with pyspark.sql.functions: - aggregate first positional → `col` (avg, try_sum, collect_list, collect_set) - unary `arg` → `col` across math/string/byte/datetime helpers - multi-arg renames: array_contains (col, value), array (*cols), shuffle (col), array_repeat (col, count), slice (x, start, length), shiftleft/right/rightunsigned (col, numBits), add_months (start, months), date_add/sub (start, days), date_diff (end, start), date_trunc (format, timestamp), time_trunc (unit, time), trunc (date, format), next_day (date, dayOfWeek), from/to_utc_timestamp (timestamp, tz), sha2 (col, numBits), xxhash64 (*cols), map_from_arrays (col1, col2), width_bucket (v, min, max, numBucket), substring (str, pos, len), concat (*cols), elt (*inputs), is_valid_utf8/make_valid_utf8 (str) Bodies updated to reference the new names; positional callers unaffected. This finishes Category 1 / Category 4 (spark-side BOTH-bucket) renames from PYSPARK_ALIGNMENT_PLAN.md PR 1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Match pyspark's optional-parameter surface in the spark namespace: - make_dt_interval, make_interval: all parts default to zero (int32 0 / lit 0.0) - str_to_map: pair_delim defaults to ',', key_value_delim defaults to ':' - round: scale defaults to 0 (HALF_UP rounding to nearest integer) - shuffle: accepts `seed` kwarg for pyspark parity; raises NotImplementedError for non-None values until the Rust binding supports it - like, ilike: accept `escapeChar` for pyspark parity; same NotImplementedError guard; first positional renamed `string` → `str` to match pyspark ceil/floor `scale=` deferred — the underlying Rust expr_fn is single-arg. Added a module-level `_ZERO_I32` literal to avoid rebuilding the pyarrow int32 zero scalar on every call. Tests: positional-compat coverage for aggregates (`spark.avg(col)` etc.), defaults-omitted cases for the optional-arg functions, and NotImplementedError cases for `shuffle(seed=)` and `like/ilike(escapeChar=)`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace generic ``*args`` with explicit pyspark-style signatures: - json_tuple(col, *fields) — first positional is the JSON expr - format_string(format, *cols) — `format` is the printf template; a plain ``str`` is auto-promoted to a literal - parse_url(url, partToExtract, key=None) — `key` is optional and only meaningful with ``partToExtract='QUERY'`` - try_parse_url(url, partToExtract, key=None) — same shape - url_decode(str), try_url_decode(str), url_encode(str) — single-argument forms (multi-arg calls were always semantically wrong) Tests cover the three-arg parse_url path and the plain-str format_string auto-promotion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`functions.spark` mirrors `pyspark.sql.functions` and now ships on this branch. Update every skill that references the function surface: - skills/datafusion_python/SKILL.md (user-facing): add an import reference, a Core Abstractions row, and a "Spark-Compatible Functions" subsection listing coverage by category, the SQL-vs-DataFrame usage (`enable_spark_functions`), and the divergent-semantics table (concat NULL, round HALF_UP, trunc) so callers know which namespace to pick. - .ai/skills/check-upstream/SKILL.md: new area for the `datafusion-spark` crate with the coverage policy (parity with pyspark, extras allowed when positional pyspark calls still work). Hygiene check also now spans `functions/spark.py`'s `__all__`. - .ai/skills/audit-skill-md/SKILL.md: add `functions.spark` to the surface table and a `spark-functions` scope so this audit also validates the new subsection and divergent-semantics table. - .ai/skills/make-pythonic/SKILL.md: explicit scope note that the spark namespace is a deliberate pyspark mirror — generic native-type coercion does not apply there. Path references updated to the new `functions/__init__.py` module layout. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The plan file is a working document, not a committed artifact, so skills must not point at it. Inline the one substantive reference (the "deferred to follow-up PRs" callout in make-pythonic) and drop the cross-cutting pointer from check-upstream. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous guidance said to skip the spark namespace entirely. That was wrong: the spark namespace should also feel pythonic — it just carries the extra constraint that every signature must remain compatible with pyspark.sql.functions (parameter names, positional order, accepted input types). Pythonic widenings like `Expr → Expr | int` are on-brand there because pyspark itself accepts the int form. Rewrite the scope section to spell out the compatibility rules (keep parameter names/order; widen input types, never narrow; extra kwargs default to None) and extend "How to Identify Candidates" to include `functions/spark.py`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| FileCompressionType::from_str(&file_compression_type.unwrap_or_default()).map_err(|_| { | ||
| PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd") | ||
| }) |
There was a problem hiding this comment.
Drive by update to remove clippy warning that was specified at the module level.
Enumerating spark functions in the user-facing skill duplicates the __all__ list in python/datafusion/functions/spark.py and will drift the moment a new function lands or is renamed. Replace the per-function listing with a category summary and a discovery snippet that queries the actual __all__ at runtime, which is the authoritative source. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
pyarrow tzinfo repr differs across versions (<UTC> vs zoneinfo.ZoneInfo(key='UTC')), breaking the doctest on some platforms. isoformat is stable across versions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # Cargo.toml # docs/source/user-guide/common-operations/functions.rst # docs/source/user-guide/common-operations/index.rst
The example called map_from_arrays, so it never exercised map_from_entries. Build an array-of-struct input and call the documented function. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add avg, try_sum, collect_list, and collect_set under a dedicated Spark-Compatible Functions entry, with a note that the datafusion.functions.spark namespace mirrors Spark semantics and may differ from the like-named built-ins. Adds a (spark-functions) anchor for the cross-reference. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Use the same lit-based single-row select and [0].as_py() accessor as the other wrappers instead of the lone to_pylist() call. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Audit the functions.spark namespace against pyspark.sql.functions and widen arguments that pyspark types as a non-column literal so callers can pass bare int/float/str instead of wrapping in lit(): - int args: array_repeat count, slice start/length, shiftleft/shiftright/ shiftrightunsigned numBits, sha2 numBits, round scale, substring pos/len, width_bucket numBucket - int32-coerced args (binding requires int32): add_months months, date_add/date_sub days, space n, make_dt_interval/make_interval parts - float args: modulus/pmod operands; make_*_interval secs - str args: next_day dayOfWeek, date_trunc/trunc format, date_part field, from_utc_timestamp/to_utc_timestamp tz, spark_cast type_str, json_tuple *fields - Any: array_contains value, if_ if_true/if_false Arguments that pyspark types as ColumnOrName (str means column name, not a literal) are left as Expr to avoid diverging from pyspark semantics: ilike/like pattern, parse_url partToExtract/key, str_to_map delimiters, bit_get pos, time_trunc unit. Also rename str_to_map's delimiter params to pairDelim/keyValueDelim to match pyspark exactly (they were pair_delim/key_value_delim). Add a coercion test matrix and update docstring examples to show the native-literal calling convention. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
For arguments that pyspark types as ColumnOrName, a bare str means a column name (not a literal). Widen these to Expr | str and resolve a str to a column reference via _to_raw_expr, matching pyspark semantics: - ilike/like pattern - parse_url/try_parse_url partToExtract and key - str_to_map pairDelim/keyValueDelim - bit_get pos - time_trunc unit Document the column-name behavior in each docstring and add a test confirming a bare str resolves to a per-row column value. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a PySpark-compatible function namespace to datafusion-python, along with SQL opt-in registration hooks and documentation/tests to support porting PySpark workloads.
Changes:
- Introduces
datafusion.functions.sparkPython wrappers backed by new Rust PyO3 bindings to thedatafusion-sparkcrate. - Adds
SessionContext.enable_spark_functions()to register Spark UDFs/UDAFs/UDWFs for SQL (overriding built-ins by name). - Adds user-guide/skill docs plus a new Python test suite covering key semantics and SQL enablement.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
python/datafusion/functions/spark.py |
New public PySpark-compatible function surface and __all__ export list. |
crates/core/src/spark_functions.rs |
New PyO3 bindings exposing datafusion-spark builders under _internal.functions.spark. |
python/datafusion/context.py |
Adds SessionContext.enable_spark_functions() Python API for SQL registration. |
crates/core/src/context.rs |
Implements Rust-side registration of all datafusion-spark scalar/aggregate/window functions. |
crates/core/src/lib.rs |
Registers the new internal functions.spark submodule. |
python/tests/test_spark_functions.py |
Adds unit tests for selected Spark wrappers and SQL enablement behavior. |
docs/source/user-guide/common-operations/spark-functions.md |
New user guide page explaining Spark namespace and SQL enablement. |
docs/source/user-guide/common-operations/index.md |
Adds spark-functions page to the common operations TOC. |
docs/source/user-guide/common-operations/functions.md |
Adds a note linking to Spark-compatible functions. |
docs/source/user-guide/common-operations/aggregations.md |
Documents Spark-compatible aggregate functions and links to the catalog. |
skills/datafusion_python/SKILL.md |
Documents the new functions.spark namespace and semantic divergences. |
python/datafusion/functions.py |
Attempts to expose spark under the datafusion.functions namespace (__all__). |
crates/core/Cargo.toml / Cargo.toml |
Adds datafusion-spark dependency wiring. |
pyproject.toml |
Excludes the new Spark tests from codespell scanning. |
.ai/skills/*/SKILL.md |
Updates internal skills docs to reflect the new spark namespace and file layout. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| FileCompressionType::from_str(&file_compression_type.unwrap_or_default()).map_err(|_| { | ||
| PyValueError::new_err("file_compression_type must one of: gzip, bz2, xz, zstd") | ||
| }) |
|
This is amazing! |
| use crate::expr::window::PyWindowFrame; | ||
|
|
||
| fn add_builder_fns_to_aggregate( | ||
| pub(crate) fn add_builder_fns_to_aggregate( |
| m.add_wrapped(wrap_pyfunction!(try_url_decode))?; | ||
| m.add_wrapped(wrap_pyfunction!(url_encode))?; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Curious if this would eventually needs to be fetched dynamically ? (I dont think this is a blocker for now)
|
Thank you @timsaucer |
|
Resolved most of the Copilot issues because they were incorrectly evaluated. Each of those parameters match the pyspark expectation for column names NOT the pattern in this repo of expecting literals. The goal in this module is to match pyspark as faithfully as possible. |
"must one of" → "must be one of". Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Closes #1482
Rationale for this change
This expands the pool of available functions for users. Some replace existing functions and others are new.
What changes are included in this PR?
Are there any user-facing changes?
No existing functions are impacted. New APIs and functions exposed.